/*
 * Copyright (c) 2005, salesforce.com, inc.
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without modification, are permitted provided 
 * that the following conditions are met:
 * 
 *    Redistributions of source code must retain the above copyright notice, this list of conditions and the 
 *    following disclaimer.
 *  
 *    Redistributions in binary form must reproduce the above copyright notice, this list of conditions and 
 *    the following disclaimer in the documentation and/or other materials provided with the distribution. 
 *    
 *    Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or 
 *    promote products derived from this software without specific prior written permission.
 *  
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED 
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 
 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 
 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 * POSSIBILITY OF SUCH DAMAGE.
 */
package com.salesforce.dataloader.process;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.log4j.Logger;

import com.salesforce.dataloader.ConfigGenerator;
import com.salesforce.dataloader.ConfigTestBase;
import com.salesforce.dataloader.TestBase;
import com.salesforce.dataloader.config.Config;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.dao.DataAccessObjectFactory;
import com.salesforce.dataloader.dao.DataReader;
import com.salesforce.dataloader.dao.csv.CSVFileReader;
import com.salesforce.dataloader.dao.csv.CSVFileWriter;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.exception.ProcessInitializationException;
import com.sforce.soap.partner.DeleteResult;
import com.sforce.soap.partner.QueryResult;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.UpsertResult;
import com.sforce.soap.partner.fault.ApiFault;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;

/**
 * Base class for batch process tests
 * 
 * @author awarshavsky
 * @since before
 */
abstract public class ProcessTestBase extends ConfigTestBase {

	public static ConfigGenerator getConfigGenerator() {
		return DEFAULT_CONFIG_GEN;
	}

	protected ProcessTestBase(String name, Map<String, String> config) {
		super(name, config);
	}

	protected ProcessTestBase(String name) {
		super(name);
	}

	// logger
	private static Logger logger = Logger.getLogger(TestBase.class);

	@Override
	public void setUp() {
		super.setUp();
		cleanRecords();
	}

	@Override
	public void tearDown() throws Exception {
		try {
			cleanRecords();
		} finally {
			super.tearDown();
		}
	}

	private void cleanRecords() {
		// cleanup the records that might've been created on previous tests
		deleteSfdcRecords("Account", ACCOUNT_WHERE_CLAUSE, 0);
		deleteSfdcRecords("Contact", CONTACT_WHERE_CLAUSE, 0);
	}

	/**
	 * @param theController
	 */
	protected void verifyNoError(Controller theController) {
		verifyErrors(theController, 0);
	}

	/**
	 * @param theController
	 * @param expectedErrors
	 */
	protected void verifyErrors(Controller theController, int expectedErrors) {
		String errFileName = theController.getConfig().getString(
				Config.OUTPUT_ERROR);
		CSVFileReader errReader = new CSVFileReader(errFileName);
		try {
			int errorRows = errReader.getTotalRows();
			assertEquals(
					"Didn't get the expected number of errors in the error file: "
							+ errFileName, expectedErrors, errorRows);
		} catch (DataAccessObjectException e) {
			fail("Error accessing error output file: " + errFileName);
		} finally {
			if (errReader != null)
				errReader.close();
		}
	}

	/**
	 * Verify that expected errors are thrown to the user
	 * 
	 * @param theController
	 * @param expecterErrorMessage
	 */
	protected void verifyErrors(Controller theController,
			String expectedErrorMessage) {
		String errFileName = theController.getConfig().getString(
				Config.OUTPUT_ERROR);
		CSVFileReader errReader = new CSVFileReader(errFileName);
		try {
			int errorRows = errReader.getTotalRows();
			if (errorRows == 1) {
				String actualError = errReader.readRow().toString();
				assertTrue("Wrong Error Message thrown to the user",
						actualError.contains(expectedErrorMessage));
			} else {
				for (int i = 0; i < errorRows; i++) {
					String actualError = errReader.readRow().toString();
					assertTrue("Wrong Error Message thrown to the user",
							actualError.contains(expectedErrorMessage));
				}
			}
		} catch (DataAccessObjectException e) {
			fail("Error accessing output file: " + errFileName);
		} finally {
			if (errReader != null)
				errReader.close();
		}
	}

	protected void verifyAllSuccess(Controller theController) {
		verifyAllSuccess(theController, null);
	}

	/**
	 * @param theController
	 */
	protected void verifyAllSuccess(Controller theController,
			String inputFileName) {
		int expectedSuccesses = 0;
		String successFileName = theController.getConfig().getString(
				Config.OUTPUT_SUCCESS);
		try {
			expectedSuccesses = ((DataReader) theController.getDao())
					.getTotalRows();
		} catch (DataAccessObjectException e) {
			fail("Error getting total rows from DAO");
		}

		verifyAllSuccess(successFileName, expectedSuccesses);

		String[] status;
		switch (theController.getConfig().getOperationInfo()) {
		case hard_delete:
			status = new String[] { "Item Hard Deleted" };
			break;
		case delete:
			status = new String[] { "Item Deleted" };
			break;
		case insert:
			status = new String[] { "Item Created" };
			break;
		case update:
			status = new String[] { "Item Updated" };
			break;
		case upsert:
			status = new String[] { "Item Updated", "Item Created" };
			break;
		default:
			status = null;
			fail("got invalid operation and don't know what to do");
			break;
		}
		verifyAllSuccess(successFileName, status);

		// Verify that the ids in the success file match the ids in the input
		// file
		if (inputFileName != null && expectedSuccesses != 0)
			verifyAllSuccessIds(successFileName, inputFileName,
					expectedSuccesses, 0);

	}

	/**
	 * @param successFileName
	 * @param expectedSuccesses
	 */
	protected void verifyAllSuccess(String successFileName,
			int expectedSuccesses) {
		CSVFileReader successReader = new CSVFileReader(successFileName);
		try {
			int successRows = successReader.getTotalRows();
			if (successRows < expectedSuccesses) {
				fail("Found successes: " + String.valueOf(successRows)
						+ ", expected: " + String.valueOf(expectedSuccesses)
						+ ".  For details please see the successes file: "
						+ successFileName);
			}
			List<Map<String, Object>> successDataRows = successReader
					.readRowList(successRows);

			for (Map<String, Object> row : successDataRows) {
				assertNotNull(
						"Each output row should have a non-null Id specified",
						row.get("ID"));
				assertFalse(
						"Each output row should have a non-empty Id specified",
						row.get("ID").toString().length() == 0);
			}
		} catch (DataAccessObjectException e) {
			fail("Error accessing success output file: " + successFileName);
		} finally {
			if (successReader != null)
				successReader.close();
		}
	}

	/**
	 * Verifies that the value of the status field matches the operation
	 * 
	 * @param successFileName
	 * @param expectedStatus
	 * 
	 */
	protected void verifyAllSuccess(String successFileName,
			String... expectedStatus) {
		CSVFileReader successReader = new CSVFileReader(successFileName);
		try {
			int successRows = successReader.getTotalRows();
			List<Map<String, Object>> successDataRows = successReader
					.readRowList(successRows);

			for (Map<String, Object> row : successDataRows) {

				boolean match = false;
				String message = null;
				for (String expected : expectedStatus) {
					if (expected.equals(row.get("STATUS"))) {
						match = true;
						break;
					}
				}

				if (!match) {
					message = "Status does not match operation.\nexpected:";
					for (String expected : expectedStatus) {
						message += expected;
						message += ",";
					}
					message += "\nactual: "+ row.get("STATUS");
					fail(message);
				}
			}

		} catch (DataAccessObjectException e) {
			fail("Error accessing success output file: " + successFileName);
		} finally {
			if (successReader != null)
				successReader.close();
		}
	}

	/**
	 * Verifies that the value of the status field in success.csv file matches
	 * the operation.
	 * 
	 * @param successFileName
	 * @param inputFileName
	 * 
	 */

	protected void verifyAllSuccessIds(String successFileName,
			String inputFileName, int expectedSuccess, int startIndex) {
		CSVFileReader successReader = new CSVFileReader(successFileName);
		CSVFileReader inputReader = new CSVFileReader(inputFileName);
		try {
			int successRows = successReader.getTotalRows();
			List<Map<String, Object>> successDataRows = successReader
					.readRowList(successRows);
			int inputRows = inputReader.getTotalRows();
			List<Map<String, Object>> inputDataRows = inputReader
					.readRowList(inputRows);

			// NOTE:Works when input file either has all valid ids or has
			// invalid ids
			// followed by valid ids starting at start index.
			// Does not work for files that have valid and invalid ids
			// alternatively in the input csv file.
			for (int i = startIndex, j = 0; i < expectedSuccess
					&& j < successRows; i++, j++) {

				assertEquals("Id value does not match actual record id",
						inputDataRows.get(i).get("ID"), successDataRows.get(j)
								.get("ID"));

			}

		} catch (DataAccessObjectException e) {
			fail("Error accessing success output file: " + successFileName);
		} finally {
			if (successReader != null)
				successReader.close();
		}
	}

	/**
	 * Upsert numRecords and return an array of id's
	 * 
	 * @param numRecords
	 * @return String[] upserted id's
	 */
	protected String[] upsertSfdcRecords(String entityName, int numRecords) {
		if (entityName.equalsIgnoreCase("Account")) {
			return upsertSfdcAccounts(numRecords);
		} else if (entityName.equalsIgnoreCase("Contact")) {
			return upsertSfdcContacts(numRecords);
		} else {
			throw new IllegalArgumentException("Unexpected entity name: "
					+ entityName);
		}
	}

	/**
	 * Upsert numAccounts accounts and return an array of account id's
	 * 
	 * @param numAccounts
	 * @return String[] upserted id's
	 */
	protected String[] upsertSfdcAccounts(int numRecords) {
		return upsertSfdcAccounts(numRecords, 0);
	}

	/**
	 * Upsert numRecords contacts and return an array of contact id's
	 * 
	 * @param numRecords
	 * @return String[] upserted id's
	 */
	protected String[] upsertSfdcContacts(int numRecords) {
		return saveSfdcRecords(numRecords, 0, false/* not insert */, true/*
																		 * ignore
																		 * output
																		 */,
				false/* not negative test */, new ContactObjectGetter());
	}

	/**
	 * Upsert numAccounts accounts and return an array of account id's
	 * 
	 * @param numRecords
	 * @param startingSeq
	 * @return String[] upserted id's
	 */
	protected String[] upsertSfdcAccounts(int numRecords, int startingSeq) {
		return saveSfdcRecords(numRecords, startingSeq, false/* not insert */,
				true/* ignore output */, false/* not negative test */,
				new AccountObjectGetter());
	}

	/**
	 * Upsert numAccounts BAD accounts -- missing required data -- and return an
	 * array of account id's
	 * 
	 * @param numRecords
	 * @param startingSeq
	 * @return String[] upserted id's
	 */
	protected String[] upsertBadSfdcAccounts(int numRecords, int startingSeq) {
		return saveSfdcRecords(numRecords, startingSeq, false/* not insert */,
				true/* ignore output */, true/* negative test */,
				new AccountObjectGetter());
	}

	/**
	 * Insert numAccounts accounts and return an array of account id's
	 * 
	 * @param numAccounts
	 * @param ignoreOutput
	 * @return String[] inserted id's
	 */
	protected String[] insertSfdcAccounts(int numAccounts, boolean ignoreOutput) {
		return saveSfdcRecords(numAccounts, 0, true, ignoreOutput, false,
				new AccountObjectGetter());
	}

	/**
	 * Insert numContacts contacts and return an array of contact id's
	 * 
	 * @param numAccounts
	 * @param ignoreOutput
	 * @return String[] inserted id's
	 * 
	 */
	protected String[] insertSfdcContacts(int numContacts, boolean ignoreOutput) {
		return saveSfdcRecords(numContacts, 0, true, ignoreOutput, false,
				new ContactObjectGetter());
	}

	/**
	 * Insert numAccounts accounts and return an array of account id's
	 * 
	 * @param numAccounts
	 * @param startingSeq
	 * @param ignoreOutput
	 * @return String[] inserted id's
	 */
	protected String[] insertSfdcAccounts(int numAccounts, int startingSeq,
			boolean ignoreOutput) {
		return saveSfdcRecords(numAccounts, startingSeq, true, ignoreOutput,
				false, new AccountObjectGetter());
	}

	private String[] saveSfdcRecords(int numRecords, int startingSeq,
			boolean insert, boolean ignoreOutput, boolean negativeTest,
			SObjectGetter objGetter) {
		// there're only SAVE_RECORD_LIMIT records allowed for this operation,
		// need to upsert records in batches and save
		// all results for the caller as an array of id's
		if (numRecords < SAVE_RECORD_LIMIT) {
			SObject[] records = getSObjects(numRecords, startingSeq,
					negativeTest, objGetter);
			if (insert) {
				logger.info("Inserting " + numRecords + " total "
						+ objGetter.getEntityName() + "s");
				return insertSfdcRecords(records, ignoreOutput, 0);
			} else {
				logger.info("Upserting " + numRecords + " total "
						+ objGetter.getEntityName() + "s");
				return upsertSfdcRecords(records, ignoreOutput, 0);
			}
		}

		String[] ids;
		if (ignoreOutput) {
			ids = new String[1];
		} else {
			ids = new String[numRecords];
		}

		if (insert) {
			logger.info("Inserting " + numRecords + " total "
					+ objGetter.getEntityName() + "s");
		} else {
			logger.info("Upserting " + numRecords + " total "
					+ objGetter.getEntityName() + "s");
		}
		List<SObject> recordsToSave = new ArrayList<SObject>();
		for (int i = 0; i < numRecords; i++) {

			// fill the array to use for operation
			recordsToSave.add(objGetter
					.getObject(i + startingSeq, negativeTest));

			// when SAVE_RECORD_LIMIT records in a current batch or total number
			// of records are reached
			// do the upsert and optionally save the record ids
			if (i > 0 && (i + 1) % SAVE_RECORD_LIMIT == 0
					|| i == numRecords - 1) {
				String[] savedIds;
				if (insert) {
					savedIds = insertSfdcRecords(recordsToSave
							.toArray(new SObject[] {}), ignoreOutput, 0);
					logger.info("Inserted " + (i + 1) + " of " + numRecords
							+ " total " + objGetter.getEntityName()
							+ "s into SFDC");
				} else {
					savedIds = upsertSfdcRecords(recordsToSave
							.toArray(new SObject[] {}), ignoreOutput, 0);
					logger.info("Upserted " + (i + 1) + " of " + numRecords
							+ " total " + objGetter.getEntityName()
							+ "s into SFDC");
				}
				if (!ignoreOutput) {
					for (int j = 0; j < savedIds.length; j++) {
						ids[i] = savedIds[j];
					}
				}
				// get new array of records
				recordsToSave.clear();
			}
		}
		return ids;
	}

	private String[] insertSfdcRecords(SObject[] records, boolean ignoreOutput,
			int retries) {
		// get the client and make the insert call
		try {
			SaveResult[] results = getBinding().create(records);
			String[] ids = new String[results.length];
			for (int i = 0; i < results.length; i++) {
				SaveResult result = results[i];
				if (!result.getSuccess()) {
					fail("Insert returned an error: "
							+ result.getErrors()[0].getMessage());
				} else {
					ids[i] = result.getId();
				}
			}
			if (ignoreOutput) {
				return new String[1];
			} else {
				return ids;
			}
		} catch (ApiFault e) {
			if (checkBinding(++retries, e) != null) {
				insertSfdcRecords(records, ignoreOutput, retries);
			}
			fail("Error inserting records: " + e.getExceptionMessage());
		} catch (ConnectionException e) {
			fail("Error inserting records: " + e.getMessage());
		}
		return null; // make eclipse happy, shouldn't reach this point after
		// fail()
	}

	private String[] upsertSfdcRecords(SObject[] records, boolean ignoreOutput,
			int retries) {
		// get the client and make the insert call
		try {
			UpsertResult[] results = getBinding().upsert(
					getController().getConfig().getString(
							Config.EXTERNAL_ID_FIELD), records);
			String[] ids = new String[results.length];
			for (int i = 0; i < results.length; i++) {
				UpsertResult result = results[i];
				if (!result.getSuccess()) {
					fail("Upsert returned an error: "
							+ result.getErrors()[0].getMessage());
				} else {
					ids[i] = result.getId();
				}
			}
			if (ignoreOutput) {
				return new String[1];
			} else {
				return ids;
			}
		} catch (ApiFault e) {
			if (checkBinding(++retries, e) != null) {
				upsertSfdcRecords(records, ignoreOutput, retries);
			}
			fail("Error upserting records: " + e.getExceptionMessage());
		} catch (ConnectionException e) {
			fail("Error upserting records: " + e.getMessage());
		}
		return null; // make eclipse happy, shouldn't reach this point after
		// fail()
	}

	/**
	 * @param numRecords
	 * @return Array of SObjects
	 */
	private SObject[] getSObjects(int numRecords, int startingSeq,
			boolean negativeTest, SObjectGetter objGetter) {
		SObject[] sobjects = new SObject[numRecords];
		for (int i = 0; i < numRecords; i++) {
			SObject sobj = objGetter.getObject(i + startingSeq, negativeTest);
			sobjects[i] = sobj;
		}
		return sobjects;
	}

	interface SObjectGetter {
		SObject getObject(int i, boolean negativeTest);

		String getEntityName();
	}

	class AccountObjectGetter implements SObjectGetter {
		/**
		 * @param i
		 * @return SObject account
		 */
		public SObject getObject(int i, boolean negativeTest) {
			String seqStr = String.format("%06d", i);
			SObject account = new SObject();
			account.setType("Account");
			account.setField("Type", "Account");
			account.setField("Name", "account insert#" + seqStr);
			String accountNumberValue = ACCOUNT_NUMBER_PREFIX + seqStr;
			if (negativeTest) {
				// dataloader test database doesn't access long account numbers
				// (longer than 20 chars)
				accountNumberValue = accountNumberValue
						+ "extraextraextraextraextraLongAccountNumber";
			}
			account.setField("AccountNumber__c", accountNumberValue);
			account.setField("AnnualRevenue", (double) 1000 * i);
			account.setField("Phone", "415-555-" + seqStr);
			account.setField("WebSite", "http://www.accountInsert" + seqStr
					+ ".com");
			account.setField(DEFAULT_ACCOUNT_EXT_ID_FIELD, "1-" + seqStr);
			return account;
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @seecom.salesforce.dataloader.process.ProcessTestBase.SObjectGetter#
		 * getEntityName()
		 */
		public String getEntityName() {
			return "Account";
		}
	}

	class ContactObjectGetter implements SObjectGetter {
		/**
		 * @param i
		 * @return SObject contact
		 */
		public SObject getObject(int i, boolean negativeTest) {
			String seqStr = String.format("%06d", i);
			SObject contact = new SObject();
			contact.setType("Contact");
			contact.setField("FirstName", "First " + seqStr);
			contact.setField("LastName", "First " + seqStr);
			String titleValue = CONTACT_TITLE_PREFIX + seqStr;
			if (negativeTest) {
				titleValue = titleValue
						+ "extraextraextraextraextraextraLoongTitleextraextraextraextraextraextraLoongTitleextraextraextraextraextraextraLoongTitle";
			}
			contact.setField("Title", titleValue);
			contact.setField("Phone", "415-555-" + seqStr);
			contact.setField(DEFAULT_CONTACT_EXT_ID_FIELD, (double) i);
			return contact;
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @seecom.salesforce.dataloader.process.ProcessTestBase.SObjectGetter#
		 * getEntityName()
		 */
		public String getEntityName() {
			return "Contact";
		}
	}

	/**
	 * @param entityName
	 * @param whereClause
	 * @param retries
	 */
	protected void deleteSfdcRecords(String entityName, String whereClause,
			int retries) {
		try {
			// query for records
			String soql = "select Id from " + entityName + " where "
					+ whereClause;
			logger.info("Querying " + entityName + "s to delete with soql: "
					+ soql);
			QueryResult qryResult = getBinding().query(soql);

			int deletedCount = 0;
			deleteSfdcRecords(qryResult, 0);
			deletedCount += qryResult.getRecords().length;
			logger.info("Deleted " + qryResult.getRecords().length + " of "
					+ deletedCount + " total deleted records");
			if (deletedCount > 0) {
				while (!qryResult.isDone()) {
					qryResult = getBinding().queryMore(
							qryResult.getQueryLocator());
					deleteSfdcRecords(qryResult, 0);
					deletedCount += qryResult.getRecords().length;
					logger.info("Deleted " + qryResult.getRecords().length
							+ " of " + deletedCount + " total deleted records");
				}
			}

		} catch (ApiFault e) {
			if (checkBinding(++retries, e) != null) {
				deleteSfdcRecords(entityName, whereClause, retries);
			}
			fail("Failed to query " + entityName + "s to delete ("
					+ whereClause + "), error: " + e.getExceptionMessage());
		} catch (ConnectionException e) {
			fail("Failed to query " + entityName + "s to delete ("
					+ whereClause + "), error: " + e.getMessage());
		}
	}

	/**
	 * @param qryResult
	 */
	protected void deleteSfdcRecords(QueryResult qryResult, int retries) {
		try {
			List<String> toDeleteIds = new ArrayList<String>();
			for (int i = 0; i < qryResult.getRecords().length; i++) {
				SObject record = qryResult.getRecords()[i];
				logger.info("Deleting record id:" + record.getId());
				toDeleteIds.add(record.getId());
				// when SAVE_RECORD_LIMIT records are reached or
				// if we're on the last query result record, do the delete
				if (i > 0 && (i + 1) % SAVE_RECORD_LIMIT == 0
						|| i == qryResult.getRecords().length - 1) {
					DeleteResult[] delResults = getBinding().delete(
							toDeleteIds.toArray(new String[] {}));
					for (int j = 0; j < delResults.length; j++) {
						DeleteResult delResult = delResults[j];
						if (!delResult.getSuccess()) {
							fail("Delete returned an error: "
									+ delResult.getErrors()[0].getMessage());
						}
					}
					toDeleteIds.clear();
				}
			}
		} catch (ApiFault e) {
			if (checkBinding(++retries, e) != null) {
				deleteSfdcRecords(qryResult, retries);
			}
			fail("Failed to delete records, error: " + e.getExceptionMessage());
		} catch (ConnectionException e) {
			fail("Failed to delete records, error: " + e.getMessage());
		}
	}

	/**
	 * Inserts the records specified in the template file and writes the
	 * inserted ids into the input csv file. Constructs the input file from the
	 * template file.
	 * 
	 * @param templateFileName
	 * @param inputFileName
	 * @param updateColName
	 * @param setId
	 * @return String path to the input file path
	 */
	protected String convertTemplateToInput(String templateFileName,
			String inputFileName, String updateColName, boolean setId,
			int maxInserts, boolean invalidInput, boolean hetrogeneousIds) {
		String templatePath = new File(getTestDataDir(), templateFileName)
				.getAbsolutePath();
		CSVFileReader templateReader = new CSVFileReader(templatePath);
		String inputPath = new File(getTestDataDir(), inputFileName)
				.getAbsolutePath();
		CSVFileWriter inputWriter = new CSVFileWriter(inputPath);
		try {
			templateReader.open();
			inputWriter.open();
			List<Map<String, Object>> templateRows = templateReader
					.readRowList(templateReader.getTotalRows());
			List<Map<String, Object>> inputRows = new ArrayList<Map<String, Object>>();

			// verify that the template file is useable
			assertTrue("No data found in the template file: "
					+ templateFileName, (templateRows != null && templateRows
					.size() > 0));

			// figure out whether any columns need to be "updated"
			boolean needsUpdate = updateColName != null
					&& updateColName.length() > 0;
			boolean colNameSet = false;

			// insert accounts for the whole template or part of it if
			// maxInserts is smaller then template size
			String[] insertedIds = null;
			String[] insertedContactIds = null;
			int insertedSize = Math.min(maxInserts, templateRows.size());
			if (invalidInput) {
				inputWriter.setColumnNames(templateReader.getColumnNames());
				colNameSet = true;
				// create a row of Invalid Input
				Map<String, Object> invalidInputRow = new HashMap<String, Object>();
				String invalidid = "Iam not an id";
				invalidInputRow.put("ID", invalidid);
				// inputRows.add(invalidInputRow);
				inputWriter.writeRow(invalidInputRow);

			}
			if (insertedSize > 0) {
				if (hetrogeneousIds) {

					insertedIds = insertSfdcAccounts(insertedSize - 1, false);
					insertedContactIds = insertSfdcContacts(1, false);
				} else {
					insertedIds = insertSfdcAccounts(insertedSize, false);
				}
				// produce input row for each template row
				for (int i = 0; i < insertedIds.length; i++) {
					Map<String, Object> templateRow = templateRows.get(i);
					if (needsUpdate) {
						assertTrue(updateColName
								+ " must be present in template file: "
								+ templatePath, templateRow
								.containsKey(updateColName));
						assertTrue(updateColName
								+ " cannot be empty in template file: "
								+ templatePath,
								templateRow.get(updateColName) != null
										&& templateRow.get(updateColName)
												.toString().length() > 0);
					}
					// create row for input appending timestamp to update column
					// and
					// optionally setting an id
					Map<String, Object> inputRow = new HashMap<String, Object>();

					for (Entry<String, Object> entry : templateRow.entrySet()) {
						String colName = entry.getKey();
						if (needsUpdate && colName.equals(updateColName)) {
							inputRow.put(colName, entry.getValue().toString()
									+ "-" + System.currentTimeMillis());
						} else {
							inputRow.put(entry.getKey(), entry.getValue());
						}
					}
					// set id if id's need to be set
					if (setId) {
						inputRow.put("ID", insertedIds[i]);
					}
					inputRows.add(inputRow);
				}
				if (!colNameSet) {
					inputWriter.setColumnNames(templateReader.getColumnNames());
				}
				inputWriter.writeRowList(inputRows);

				if (hetrogeneousIds) {
					// create a row for Contact ids
					Map<String, Object> contactRow = new HashMap<String, Object>();
					contactRow.put("ID", insertedContactIds[0]);
					inputWriter.writeRow(contactRow);

				}

			}
		} catch (DataAccessObjectException e) {
			fail("Failed to create input data: " + inputPath
					+ " from template: " + templatePath + ", error: "
					+ e.getMessage());
		} finally {
			templateReader.close();
			inputWriter.close();
		}
		return inputPath;
	}

	/**
	 * Creates an empty csv file
	 * 
	 * @param inputFileName
	 */

	protected String createEmptyInput(String inputFileName) {

		String inputPath = new File(getTestDataDir(), inputFileName)
				.getAbsolutePath();
		CSVFileWriter inputWriter = new CSVFileWriter(inputPath);
		try {
			inputWriter.open();
			Map<String, Object> inputRow = new HashMap<String, Object>();
			inputRow.put("ID", "");
			inputWriter.writeRow(inputRow);

		} catch (DataAccessObjectException e) {
			fail("Failed to create input data: " + inputPath + ", error: "
					+ e.getMessage());
		} finally {

			inputWriter.close();
		}

		return inputPath;
	}

	protected Map<String, String> getTestConfig(String op, String daoName,
			boolean isWrite) {
		Map<String, String> res = super.getTestConfig();
		res.put(Config.MAPPING_FILE, new File(getTestDataDir(), this.baseName
				+ "Map.sdl").getAbsolutePath());
		res.put(Config.OPERATION, op);
		res.put(Config.DAO_NAME, daoName);
		res.put(Config.DAO_TYPE,
				isWrite ? DataAccessObjectFactory.CSV_WRITE_TYPE
						: DataAccessObjectFactory.CSV_READ_TYPE);
		res.put(Config.OUTPUT_STATUS_DIR, getTestStatusDir());
		res.put(Config.OUTPUT_SUCCESS, new File(getTestStatusDir(),
				this.baseName + "Success.csv").getAbsolutePath());
		res.put(Config.OUTPUT_ERROR, new File(getTestStatusDir(), this.baseName
				+ "Error.csv").getAbsolutePath());
		return res;
	}

	protected Map<String, String> getTestConfig(String op, boolean isWrite) {
		return getTestConfig(op, new File(getTestDataDir(), this.baseName
				+ ".csv").getAbsolutePath(), isWrite);
	}

	protected void runDeleteAccountCsvTest(Map<String, String> argMap)
			throws Exception {

		// do an insert of some account records to ensure there is some data to
		// hard delete
		String deleteFileName = convertTemplateToInput(baseName
				+ "Template.csv", baseName + ".csv", "", true,
				Integer.MAX_VALUE, false, false);

		Controller theController = runProcess(argMap);

		// verify there were no errors during operation
		verifyNoError(theController);

		// verify that all the records were successful
		verifyAllSuccess(theController, deleteFileName);

	}

	protected Controller runProcess(Map<String, String> argMap)
			throws ProcessInitializationException {
		ProcessRunner runner;
		runner = ProcessRunner.getInstance(argMap);
		runner.setName(baseName);
		runner.run();
		Controller theController = runner.getController(); // get controller
		// created for this
		// process
		return theController;

	}

}
